package com.cmbyc.JDBC.sink

import com.cmbyc.JDBC.source.Student
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import java.sql.DriverManager

/**
 *
 * @program: com.cmbyc.JDBC.sink
 * @author: YCLW058
 * @create: 2021-05-19 14:32
 * @decsription:
 *
 * */

class Sink2 extends MySQLSink {
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/cd_mysql", "root", "root")
    val sql = "INSERT INTO `test`(`id`,`name`)VALUES (?,?)"
    ps = conn.prepareStatement(sql)

  }

  override def invoke(value: Student, context: SinkFunction.Context): Unit = {
    //获取 具体值
     ps.setInt(1, value.id)
    ps.setString(2, value.name)
    //ps.setInt(2, value.age)

    //执行
    ps.executeUpdate();

  }
}
